/*
 * Copyright (c) 2022. China Mobile (SuZhou) Software Technology Co.,Ltd. All rights reserved.
 * Lakehouse is licensed under Mulan PSL v2.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *          http://license.coscl.org.cn/MulanPSL2
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 */

package com.chinamobile.cmss.lakehouse.core.client.impl;

import com.chinamobile.cmss.lakehouse.common.kubernetes.K8sModelConstant;
import com.chinamobile.cmss.lakehouse.common.utils.WaitUtil;
import com.chinamobile.cmss.lakehouse.core.client.IKubernetesClient;
import com.chinamobile.cmss.lakehouse.core.client.InformerClient;
import com.chinamobile.cmss.lakehouse.core.client.InformerCreateEvent;
import com.chinamobile.cmss.lakehouse.core.client.InformerCreateEvent.CreateStatus;
import com.chinamobile.cmss.lakehouse.core.client.InformerDeleteEvent;
import com.chinamobile.cmss.lakehouse.core.client.InformerDeleteEvent.DeleteStatus;
import com.chinamobile.cmss.lakehouse.core.client.InformerEventType;
import com.chinamobile.cmss.lakehouse.core.client.InformerException;
import com.chinamobile.cmss.lakehouse.core.client.InformerUtils;
import com.chinamobile.cmss.lakehouse.core.config.KubernetesConfiguration;

import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import com.google.common.annotations.VisibleForTesting;
import io.kubernetes.client.custom.V1Patch;
import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.models.V1ClusterRoleBinding;
import io.kubernetes.client.openapi.models.V1ConfigMap;
import io.kubernetes.client.openapi.models.V1DeleteOptions;
import io.kubernetes.client.openapi.models.V1Deployment;
import io.kubernetes.client.openapi.models.V1Namespace;
import io.kubernetes.client.openapi.models.V1Service;
import io.kubernetes.client.openapi.models.V1StatefulSet;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Slf4j
@VisibleForTesting
@Component
public class InformerClientImpl implements InformerClient {

    private static final V1DeleteOptions DELETE_OPTIONS =
        new V1DeleteOptions().apiVersion(K8sModelConstant.API_VERSION);

    private IKubernetesClient kubernetesClient;

    @Autowired
    private InformerFactoryImpl factory;

    @Autowired
    public InformerClientImpl(IKubernetesClient kubernetesClient) {
        this.kubernetesClient = kubernetesClient;
    }

    @Override
    public V1Namespace createNamespace(V1Namespace namespace) throws InformerException {
        CompletableFuture<Void> nsFuture = factory.getOrAddNamespaceInformer();
        String namespaceName = namespace.getMetadata().getName();

        InformerCreateEvent event = factory.createEventContainers.computeIfAbsent(namespaceName,
            (nsName) -> InformerCreateEvent.create(namespaceName, InformerEventType.NAMESPACE));

        AtomicBoolean finished = new AtomicBoolean(false);
        try {
            nsFuture.thenAccept(nsInformer -> {
                try {
                    // create namespace and wait for success
                    kubernetesClient.createNamespace(namespaceName);
                    boolean result =
                        event.getLatch().await(KubernetesConfiguration.informerWaitMs, TimeUnit.MILLISECONDS);
                    finished.set(result);
                } catch (ApiException | InterruptedException e) {
                    log.error("create namespace {} error with response {}", namespace, e.getMessage());
                    errorCreate(event, e);
                }
            }).exceptionally(exception -> {
                errorCreate(event, exception);
                return null;
            });

            if (event.getException() != null) {
                log.error("Failed to create namespace {}", namespaceName);
                throw event.getException();
            }

            if (!finished.get()) {
                throw new InformerException(String.format("Failed completing create namespace %s within %s",
                    namespaceName, KubernetesConfiguration.informerWaitMs));
            }

            if (CreateStatus.CREATE_SUCCESS.equals(event.getStatus()) && event.getResult() != null) {
                log.info("Create namespace {} successfully", event.getName());
                return (V1Namespace) event.getResult();
            }
        } finally {
            factory.createEventContainers.remove(namespaceName);
        }
        return null;
    }

    @Override
    public V1Namespace deleteNamespace(String namespace) throws InformerException {
        CompletableFuture<Void> nsFuture = factory.getOrAddNamespaceInformer();

        InformerDeleteEvent event = factory.deleteEventContainers.computeIfAbsent(namespace,
            (nsName) -> InformerDeleteEvent.create(namespace, InformerEventType.NAMESPACE));

        AtomicBoolean finished = new AtomicBoolean(false);
        try {
            nsFuture.thenAccept(nsInformer -> {
                try {
                    // delete namespace and wait for success
                    kubernetesClient.deleteNamespace(namespace, DELETE_OPTIONS);
                    boolean result =
                        event.getLatch().await(KubernetesConfiguration.informerWaitMs, TimeUnit.MILLISECONDS);
                    finished.set(result);
                } catch (ApiException | InterruptedException e) {
                    log.error("delete namespace {} error with response {}", namespace, e.getMessage());
                    errorDelete(event, e);
                }
            }).exceptionally(exception -> {
                errorDelete(event, exception);
                return null;
            });

            if (event.getException() != null) {
                log.error("Failed to delete namespace {}", namespace);
                throw event.getException();
            }

            if (!finished.get()) {
                throw new InformerException(String.format("Failed completing delete namespace %s within %s",
                    namespace, KubernetesConfiguration.informerWaitMs));
            }

            if (DeleteStatus.DELETE_SUCCESS.equals(event.getStatus()) && event.getResult() != null) {
                log.info("Delete namespace {} successfully", event.getName());
                return (V1Namespace) event.getResult();
            }
        } finally {
            factory.deleteEventContainers.remove(namespace);
        }
        return null;
    }

    @Override
    public V1ClusterRoleBinding createClusterRoleBinding(V1ClusterRoleBinding clusterRoleBinding)
        throws InformerException {
        CompletableFuture<Void> crbFuture = factory.getOrAddClusterRoleBindingInformer();
        String crbName = clusterRoleBinding.getMetadata().getName();

        InformerCreateEvent event = factory.createEventContainers.computeIfAbsent(crbName,
            (clusterRBName) -> InformerCreateEvent.create(crbName,
                InformerEventType.CLUSTER_ROLE_BINDING));

        AtomicBoolean finished = new AtomicBoolean(false);
        try {
            crbFuture.thenAccept(crbInformer -> {
                try {
                    // create clusterRoleBinding and wait for success
                    kubernetesClient.createClusterRoleBinding(clusterRoleBinding);
                    boolean result =
                        event.getLatch().await(KubernetesConfiguration.informerWaitMs, TimeUnit.MILLISECONDS);
                    finished.set(result);
                } catch (ApiException | InterruptedException e) {
                    log.error("create clusterRoleBinding {} error with response {}", crbName, e.getMessage());
                    errorCreate(event, e);
                }
            }).exceptionally(exception -> {
                errorCreate(event, exception);
                return null;
            });

            if (event.getException() != null) {
                log.error("Failed to create clusterRoleBinding {}", crbName);
                throw event.getException();
            }

            if (!finished.get()) {
                throw new InformerException(
                    String.format("Failed completing create clusterRoleBinding %s within %s", crbName,
                        KubernetesConfiguration.informerWaitMs));
            }

            if (CreateStatus.CREATE_SUCCESS.equals(event.getStatus()) && event.getResult() != null) {
                log.info("Create clusterRoleBinding {} successfully", event.getName());
                return (V1ClusterRoleBinding) event.getResult();
            }
        } finally {
            factory.createEventContainers.remove(crbName);
        }
        return null;
    }

    @Override
    public V1ClusterRoleBinding deleteClusterRoleBinding(String clusterRoleBinding)
        throws InformerException {
        CompletableFuture<Void> crbFuture = factory.getOrAddClusterRoleBindingInformer();

        InformerDeleteEvent event = factory.deleteEventContainers.computeIfAbsent(clusterRoleBinding,
            (crbName) -> InformerDeleteEvent.create(clusterRoleBinding,
                InformerEventType.CLUSTER_ROLE_BINDING));

        AtomicBoolean finished = new AtomicBoolean(false);
        try {
            crbFuture.thenAccept(crbInformer -> {
                try {
                    // delete clusterRoleBinding and wait for success
                    kubernetesClient.deleteClusterRoleBinding(clusterRoleBinding, DELETE_OPTIONS);
                    boolean result =
                        event.getLatch().await(KubernetesConfiguration.informerWaitMs, TimeUnit.MILLISECONDS);
                    finished.set(result);
                } catch (ApiException | InterruptedException e) {
                    log.error("delete clusterRoleBinding {} error with response {}", clusterRoleBinding,
                        e.getMessage());
                    errorDelete(event, e);
                }
            }).exceptionally(exception -> {
                errorDelete(event, exception);
                return null;
            });

            if (event.getException() != null) {
                log.error("Failed to delete clusterRoleBinding {}", clusterRoleBinding);
                throw event.getException();
            }

            if (!finished.get()) {
                throw new InformerException(
                    String.format("Failed completing delete clusterRoleBinding %s within %s",
                        clusterRoleBinding, KubernetesConfiguration.informerWaitMs));
            }

            if (DeleteStatus.DELETE_SUCCESS.equals(event.getStatus()) && event.getResult() != null) {
                log.info("Delete clusterRoleBinding {} successfully", event.getName());
                return (V1ClusterRoleBinding) event.getResult();
            }
        } finally {
            factory.deleteEventContainers.remove(clusterRoleBinding);
        }
        return null;
    }

    @Override
    public V1Service createService(String namespace, V1Service service) throws InformerException {
        CompletableFuture<Void> svcFuture = factory.getOrAddServiceInformer();
        String serviceName = service.getMetadata().getName();
        String wrapServiceName = InformerUtils.wrap(namespace, serviceName);

        InformerCreateEvent event = factory.createEventContainers.computeIfAbsent(wrapServiceName,
            (sName) -> InformerCreateEvent.create(wrapServiceName, InformerEventType.SERVICE));

        AtomicBoolean finished = new AtomicBoolean(false);
        try {
            svcFuture.thenAccept(svcInformer -> {
                try {
                    // create service and wait for success
                    kubernetesClient.createService(namespace, service);
                    boolean result =
                        event.getLatch().await(KubernetesConfiguration.informerWaitMs, TimeUnit.MILLISECONDS);
                    finished.set(result);
                } catch (ApiException | InterruptedException e) {
                    log.error("create service {} error with response {}", serviceName, e.getMessage());
                    errorCreate(event, e);
                }
            }).exceptionally(exception -> {
                errorCreate(event, exception);
                return null;
            });

            if (event.getException() != null) {
                log.error("Failed to create service {} in namespace {}", serviceName, namespace);
                throw event.getException();
            }

            if (!finished.get()) {
                throw new InformerException(String.format("Failed completing create service %s within %s",
                    serviceName, KubernetesConfiguration.informerWaitMs));
            }

            if (CreateStatus.CREATE_SUCCESS.equals(event.getStatus()) && event.getResult() != null) {
                log.info("Create service {} in namespace {} successfully", serviceName, namespace);
                return (V1Service) event.getResult();
            }
        } finally {
            factory.createEventContainers.remove(wrapServiceName);
        }
        return null;
    }

    @Override
    public V1Service deleteService(String namespace, String serviceName) throws InformerException {
        CompletableFuture<Void> svcFuture = factory.getOrAddServiceInformer();
        String wrapServiceName = InformerUtils.wrap(namespace, serviceName);

        InformerDeleteEvent event = factory.deleteEventContainers.computeIfAbsent(wrapServiceName,
            (sName) -> InformerDeleteEvent.create(wrapServiceName, InformerEventType.SERVICE));

        AtomicBoolean finished = new AtomicBoolean(false);
        try {
            svcFuture.thenAccept(nsInformer -> {
                try {
                    // delete service and wait for success
                    kubernetesClient.deleteService(namespace, serviceName, DELETE_OPTIONS);
                    boolean result =
                        event.getLatch().await(KubernetesConfiguration.informerWaitMs, TimeUnit.MILLISECONDS);
                    finished.set(result);
                } catch (ApiException | InterruptedException e) {
                    log.error("delete service {} error with response {}", serviceName, e.getMessage());
                    errorDelete(event, e);
                }
            }).exceptionally(exception -> {
                errorDelete(event, exception);
                return null;
            });

            if (event.getException() != null) {
                log.error("Failed to delete service {} in namespace {}", serviceName, namespace);
                throw event.getException();
            }

            if (!finished.get()) {
                throw new InformerException(String.format("Failed completing delete service %s within %s",
                    serviceName, KubernetesConfiguration.informerWaitMs));
            }

            if (DeleteStatus.DELETE_SUCCESS.equals(event.getStatus()) && event.getResult() != null) {
                log.info("Delete service {} in namespace {} successfully", serviceName, namespace);
                return (V1Service) event.getResult();
            }
        } finally {
            factory.deleteEventContainers.remove(wrapServiceName);
        }
        return null;
    }

    @Override
    public V1StatefulSet createStatefulSet(String namespace, V1StatefulSet statefulset)
        throws InformerException {
        CompletableFuture<Void> ssFuture = factory.getOrAddStatefulSetInformer();
        String statefulsetName = statefulset.getMetadata().getName();
        String wrapStatefulSetName = InformerUtils.wrap(namespace, statefulsetName);

        InformerCreateEvent event = factory.createEventContainers.computeIfAbsent(wrapStatefulSetName,
            (ssName) -> InformerCreateEvent.create(wrapStatefulSetName, InformerEventType.STATEFUL_SET));

        AtomicBoolean finished = new AtomicBoolean(false);
        try {
            ssFuture.thenAccept(ssInformer -> {
                try {
                    // create statefulset and wait for success
                    kubernetesClient.createStatefulSet(namespace, statefulset);
                    boolean result =
                        event.getLatch().await(KubernetesConfiguration.informerWaitMs, TimeUnit.MILLISECONDS);
                    finished.set(result);
                } catch (ApiException | InterruptedException e) {
                    log.error("create statefulset {} error with response {}", statefulsetName,
                        e.getMessage());
                    errorCreate(event, e);
                }
            }).exceptionally(exception -> {
                errorCreate(event, exception);
                return null;
            });

            if (event.getException() != null) {
                log.error("Failed to create statefulset {} in namespace {}", statefulsetName, namespace);
                throw event.getException();
            }

            if (!finished.get()) {
                throw new InformerException(
                    String.format("Failed completing create statefulset %s within %s", statefulsetName,
                        KubernetesConfiguration.informerWaitMs));
            }

            if (CreateStatus.CREATE_SUCCESS.equals(event.getStatus()) && event.getResult() != null) {
                log.info("Create statefulset {} in namespace {} successfully", statefulsetName, namespace);
                return (V1StatefulSet) event.getResult();
            }
        } finally {
            factory.createEventContainers.remove(wrapStatefulSetName);
        }
        return null;
    }

    @Override
    public V1Deployment createDeployment(String namespace, V1Deployment deployment)
        throws InformerException {
        CompletableFuture<Void> ssFuture = factory.getOrAddDeploymentInformer();
        String deploymentName = deployment.getMetadata().getName();
        String wrapDeploymentName = InformerUtils.wrap(namespace, deploymentName);

        InformerCreateEvent event = factory.createEventContainers.computeIfAbsent(wrapDeploymentName,
            (ssName) -> InformerCreateEvent.create(wrapDeploymentName, InformerEventType.DEPLOYMENT));

        AtomicBoolean finished = new AtomicBoolean(false);
        try {
            ssFuture.thenAccept(ssInformer -> {
                try {
                    // create deployment and wait for success
                    kubernetesClient.createDeployment(namespace, deployment);
                    boolean result =
                        event.getLatch().await(KubernetesConfiguration.informerWaitMs, TimeUnit.MILLISECONDS);
                    finished.set(result);
                } catch (ApiException | InterruptedException e) {
                    log.error("create deployment {} error with response {}", deploymentName, e.getMessage());
                    errorCreate(event, e);
                }
            }).exceptionally(exception -> {
                errorCreate(event, exception);
                return null;
            });

            if (event.getException() != null) {
                log.error("Failed to create deployment {} in namespace {}", deploymentName, namespace);
                throw event.getException();
            }

            if (!finished.get()) {
                throw new InformerException(
                    String.format("Failed completing create deployment %s within %s", deploymentName,
                        KubernetesConfiguration.informerWaitMs));
            }

            if (CreateStatus.CREATE_SUCCESS.equals(event.getStatus()) && event.getResult() != null) {
                log.info("Create deployment {} in namespace {} successfully", deploymentName, namespace);
                return (V1Deployment) event.getResult();
            }
        } finally {
            factory.createEventContainers.remove(wrapDeploymentName);
        }
        return null;
    }

    @Override
    public V1Deployment patchDeployment(String namespace, String jsonPatch, String deploymentName)
        throws InformerException {
        CompletableFuture<Void> ssFuture = factory.getOrAddDeploymentInformer();
        String wrapDeploymentName = InformerUtils.wrap(namespace, deploymentName);

        InformerCreateEvent event = factory.createEventContainers.computeIfAbsent(wrapDeploymentName,
            (ssName) -> InformerCreateEvent.create(wrapDeploymentName, InformerEventType.DEPLOYMENT));

        AtomicBoolean finished = new AtomicBoolean(false);
        try {
            ssFuture.thenAccept(ssInformer -> {
                try {
                    V1Patch body = new V1Patch(jsonPatch);
                    kubernetesClient.patchDeployment(deploymentName, namespace, body);
                    boolean result =
                        event.getLatch().await(KubernetesConfiguration.informerWaitMs, TimeUnit.MILLISECONDS);
                    finished.set(result);
                } catch (ApiException | InterruptedException e) {
                    log.error("patch deployment {} error with response {}", deploymentName, e.getMessage());
                    errorCreate(event, e);
                }
            }).exceptionally(exception -> {
                errorCreate(event, exception);
                return null;
            });

            if (event.getException() != null) {
                log.error("Failed to patch deployment {} in namespace {}", deploymentName, namespace);
                throw event.getException();
            }

            if (!finished.get()) {
                throw new InformerException(
                    String.format("Failed completing patch deployment %s within %s", deploymentName,
                        KubernetesConfiguration.informerWaitMs));
            }

            if (CreateStatus.CREATE_SUCCESS.equals(event.getStatus()) && event.getResult() != null) {
                log.info("patch deployment {} in namespace {} successfully", deploymentName, namespace);
                return (V1Deployment) event.getResult();
            }
        } finally {
            factory.createEventContainers.remove(wrapDeploymentName);
        }
        return null;

    }

    @Override
    public boolean patchStatefulSet(String namespace, String jsonPatch, String statefulSetName)
        throws ApiException {
        V1Patch body = new V1Patch(jsonPatch);
        kubernetesClient.patchStatefulSet(statefulSetName, namespace, body);
        // Wait until statefulset has stabilized after rollout restart
        WaitUtil.poll(
            Duration.ofSeconds(3),
            Duration.ofSeconds(60),
            () -> {
                log.info("Waiting until statefulset {} restarted successfully.", statefulSetName);
                Optional<V1StatefulSet> statefulSetOptional = kubernetesClient.readNamespacedStatefulSet(namespace, statefulSetName);
                return statefulSetOptional.filter(statefulSet -> statefulSet.getStatus().getReadyReplicas() > 0).isPresent();
            });
        return false;
    }

    @Override
    public V1StatefulSet deleteStatefulSet(String namespace, String statefulsetName)
        throws InformerException {
        CompletableFuture<Void> ssFuture = factory.getOrAddStatefulSetInformer();
        String wrapStatefulSetName = InformerUtils.wrap(namespace, statefulsetName);

        InformerDeleteEvent event = factory.deleteEventContainers.computeIfAbsent(wrapStatefulSetName,
            (ssName) -> InformerDeleteEvent.create(wrapStatefulSetName, InformerEventType.STATEFUL_SET));

        AtomicBoolean finished = new AtomicBoolean(false);
        try {
            ssFuture.thenAccept(ssInformer -> {
                try {
                    // delete statefulset and wait for success
                    kubernetesClient.deleteStatefulSet(namespace, statefulsetName, DELETE_OPTIONS);
                    boolean result =
                        event.getLatch().await(KubernetesConfiguration.informerWaitMs, TimeUnit.MILLISECONDS);
                    finished.set(result);
                } catch (ApiException | InterruptedException e) {
                    log.error("delete statefulset {} error with response {}", statefulsetName,
                        e.getMessage());
                    errorDelete(event, e);
                }
            }).exceptionally(exception -> {
                errorDelete(event, exception);
                return null;
            });

            if (event.getException() != null) {
                log.error("Failed to delete statefulset {} in namespace {}", statefulsetName, namespace);
                throw event.getException();
            }

            if (!finished.get()) {
                throw new InformerException(
                    String.format("Failed completing delete statefulset %s within %s", statefulsetName,
                        KubernetesConfiguration.informerWaitMs));
            }

            if (DeleteStatus.DELETE_SUCCESS.equals(event.getStatus()) && event.getResult() != null) {
                log.info("Delete statefulset {} in namespace {} successfully", statefulsetName, namespace);
                return (V1StatefulSet) event.getResult();
            }
        } finally {
            factory.deleteEventContainers.remove(wrapStatefulSetName);
        }
        return null;
    }

    @Override
    public V1Deployment deleteDeployment(String namespace, String deploymentName)
        throws InformerException {
        CompletableFuture<Void> ssFuture = factory.getOrAddDeploymentInformer();
        String wrapDeploymentName = InformerUtils.wrap(namespace, deploymentName);

        InformerDeleteEvent event = factory.deleteEventContainers.computeIfAbsent(wrapDeploymentName,
            (ssName) -> InformerDeleteEvent.create(wrapDeploymentName, InformerEventType.DEPLOYMENT));

        AtomicBoolean finished = new AtomicBoolean(false);
        try {
            ssFuture.thenAccept(ssInformer -> {
                try {
                    // delete deployment and wait for success
                    kubernetesClient.deleteDeployment(namespace, deploymentName, DELETE_OPTIONS);
                    boolean result =
                        event.getLatch().await(KubernetesConfiguration.informerWaitMs, TimeUnit.MILLISECONDS);
                    finished.set(result);
                } catch (ApiException | InterruptedException e) {
                    log.error("delete deployment {} error with response {}", deploymentName, e.getMessage());
                    errorDelete(event, e);
                }
            }).exceptionally(exception -> {
                errorDelete(event, exception);
                return null;
            });

            if (event.getException() != null) {
                log.error("Failed to delete deployment {} in namespace {}", deploymentName, namespace);
                throw event.getException();
            }

            if (!finished.get()) {
                throw new InformerException(
                    String.format("Failed completing delete deployment %s within %s", deploymentName,
                        KubernetesConfiguration.informerWaitMs));
            }

            if (DeleteStatus.DELETE_SUCCESS.equals(event.getStatus()) && event.getResult() != null) {
                log.info("Delete deployment {} in namespace {} successfully", deploymentName, namespace);
                return (V1Deployment) event.getResult();
            }
        } finally {
            factory.deleteEventContainers.remove(wrapDeploymentName);
        }
        return null;
    }

    @Override
    public V1ConfigMap createConfigMap(String namespace, V1ConfigMap configmap)
        throws InformerException {
        CompletableFuture<Void> cmFuture = factory.getOrAddConfigMapInformer();
        String configmapName = configmap.getMetadata().getName();
        String wrapConfigMapName = InformerUtils.wrap(namespace, configmapName);

        InformerCreateEvent event = factory.createEventContainers.computeIfAbsent(wrapConfigMapName,
            (cmName) -> InformerCreateEvent.create(wrapConfigMapName, InformerEventType.CONFIG_MAP));

        AtomicBoolean finished = new AtomicBoolean(false);
        try {
            cmFuture.thenAccept(cmInformer -> {
                try {
                    // create configmap and wait for success
                    kubernetesClient.createConfigMap(namespace, configmap);
                    boolean result =
                        event.getLatch().await(KubernetesConfiguration.informerWaitMs, TimeUnit.MILLISECONDS);
                    finished.set(result);
                } catch (ApiException | InterruptedException e) {
                    log.error("create configmap {} error with response {}", configmapName, e.getMessage());
                    errorCreate(event, e);
                }
            }).exceptionally(exception -> {
                errorCreate(event, exception);
                return null;
            });

            if (event.getException() != null) {
                log.error("Failed to create configmap {} in namespace {}", configmapName, namespace);
                throw event.getException();
            }

            if (!finished.get()) {
                throw new InformerException(String.format("Failed completing create configmap %s within %s",
                    configmapName, KubernetesConfiguration.informerWaitMs));
            }

            if (CreateStatus.CREATE_SUCCESS.equals(event.getStatus()) && event.getResult() != null) {
                log.info("Create configmap {} in namespace {} successfully", configmapName, namespace);
                return (V1ConfigMap) event.getResult();
            }
        } finally {
            factory.createEventContainers.remove(wrapConfigMapName);
        }
        return null;
    }

    @Override
    public V1ConfigMap deleteConfigMap(String namespace, String configmapName)
        throws InformerException {
        CompletableFuture<Void> cmFuture = factory.getOrAddConfigMapInformer();
        String wrapConfigMapName = InformerUtils.wrap(namespace, configmapName);

        InformerDeleteEvent event = factory.deleteEventContainers.computeIfAbsent(wrapConfigMapName,
            (cmName) -> InformerDeleteEvent.create(wrapConfigMapName, InformerEventType.CONFIG_MAP));

        AtomicBoolean finished = new AtomicBoolean(false);
        try {
            cmFuture.thenAccept(cmInformer -> {
                try {
                    // delete configmap and wait for success
                    kubernetesClient.deleteConfigMap(namespace, configmapName, DELETE_OPTIONS);
                    boolean result =
                        event.getLatch().await(KubernetesConfiguration.informerWaitMs, TimeUnit.MILLISECONDS);
                    finished.set(result);
                } catch (ApiException | InterruptedException e) {
                    log.error("delete configmap {} error with response {}", configmapName, e.getMessage());
                    errorDelete(event, e);
                }
            }).exceptionally(exception -> {
                errorDelete(event, exception);
                return null;
            });

            if (event.getException() != null) {
                log.error("Failed to delete configmap {} in namespace {}", configmapName, namespace);
                throw event.getException();
            }

            if (!finished.get()) {
                throw new InformerException(String.format("Failed completing delete configmap %s within %s",
                    configmapName, KubernetesConfiguration.informerWaitMs));
            }

            if (DeleteStatus.DELETE_SUCCESS.equals(event.getStatus()) && event.getResult() != null) {
                log.info("Delete configmap {} in namespace {} successfully", configmapName, namespace);
                return (V1ConfigMap) event.getResult();
            }
        } finally {
            factory.deleteEventContainers.remove(wrapConfigMapName);
        }
        return null;
    }

    /**
     * create event Exception
     *
     * @param event
     * @param e
     */
    private void errorCreate(InformerCreateEvent event, Throwable e) {
        event.setException(new InformerException(e));
        event.setStatus(CreateStatus.CREATE_ERROR);
        event.release();
    }

    /**
     * delete event Exception
     *
     * @param event
     * @param e
     */
    private void errorDelete(InformerDeleteEvent event, Throwable e) {
        event.setException(new InformerException(e));
        event.setStatus(DeleteStatus.DELETE_ERROR);
        event.release();
    }

    @Override
    public InformerFactoryImpl getInformerFactory() {
        return this.factory;
    }

    @Override
    public boolean checkResourceHealthStatus(String namespace) throws ApiException {
        if (!checkNamespaceHealthStatus(namespace)) {
            log.info("checkNamespaceHealthStatus fail {}", namespace);
            return false;
        }
        if (!checkStatefulSetHealthStatus(namespace)) {
            log.info("checkStatefulSetHealthStatus fail {}", namespace);
            return false;
        }
        return true;
    }

    /**
     * check namespace status
     *
     * @param namespace
     * @return false if the status of namespace is not "Active"
     * @throws ApiException
     */
    private boolean checkNamespaceHealthStatus(String namespace) throws ApiException {
        V1Namespace v1Namespace =
            kubernetesClient.listNamespace(K8sModelConstant.LABEL_SELECTOR).getItems().stream()
                .collect(Collectors.toMap(ns -> ns.getMetadata().getName(), ns -> ns)).get(namespace);
        return v1Namespace != null && "Active".equals(v1Namespace.getStatus().getPhase());
    }

    /**
     * check statefulset status
     *
     * @param namespace
     * @return false if all replicas of statefulset in the namespace are not "Ready"
     * @throws ApiException
     */
    private boolean checkStatefulSetHealthStatus(String namespace) throws ApiException {
        CompletableFuture<Void> ssFuture = factory.getOrAddStatefulSetInformer();

        AtomicBoolean notAllReady = new AtomicBoolean(false);
        ssFuture.thenAccept(ssInformer -> {
            try {
                List<V1StatefulSet> statefulSetList =
                    kubernetesClient.listStatefulSet(namespace).getItems();
                notAllReady.set(statefulSetList.stream().anyMatch(this::checkNotReadyReplicas));
            } catch (ApiException e) {
                log.error("check statefulset health status error {}", e.getResponseBody());
                notAllReady.set(false);
            }
        }).exceptionally(exception -> {
            log.error("Failed to check statefulset health status in namespace {}", namespace, exception);
            return null;
        });
        return !notAllReady.get();
    }

    private boolean checkNotReadyReplicas(V1StatefulSet ss) {
        if (ss.getStatus() == null || ss.getStatus().getReadyReplicas() == null) {
            log.error("statefulset {} whose status is null", ss.getMetadata().getName());
            return true;
        } else {
            return ss.getStatus().getReadyReplicas() < 1;
        }
    }

}
